package com.atguigu.bigdata.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


object SparkStreaming01_WordCount {
//  使用SparkStreaming 完成WordCount
  def main(args: Array[String]): Unit = {
//    spark配置对象
     val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming01_WordCount")
//   实时数据分析环境对象
//   采集周期，以指定的时间为周期采集实时数据
     val streamingContext = new StreamingContext(sparkConf,Seconds(60))
//   从指定的端口中采集数据  hadoop102上执行：nc -lk 9999
     val socketLineDStream = streamingContext.socketTextStream("10.21.13.181",9999)
//   将采集的数据进行分解（扁平化）
    val wordDStream = socketLineDStream.flatMap(line=>line.split(" "))
//   将数据进行结构的转换方便统计
    val mapDStream = wordDStream.map((_,1))
//map(x=>(x,1))
    val wordToSumDStream = mapDStream.reduceByKey(_+_)

//reduceByKey((x,y)=>x+y)
    wordToSumDStream.print()
//    不能停止采集
//    streamingContext.stop()
//   启动采集器
    streamingContext.start()
//    Driver等待采集器的执行
    streamingContext.awaitTermination()
  }
}
